package edu.nuc.wordcount;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

import static org.apache.hadoop.mapreduce.Job.getInstance;

public class MyWordCount {
    static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
        //Mapper<KETIN,VALUEIN,KEYOUT,VALUEOUT>
        //KEYIN:框架要传送给咱们写的map方法的输入参数key的数据类型
        //默认情况下，框架要传入的key是框架从待处理数据（文本文件）中，读取的某一行数据的偏移量
        //所以数据类型是Long(对于海量数据来说int不够用)
        //VALUE：框架要传送给咱们自己写的map方法的数据参数value的数据类型
        //所以是String类型，但是Long,String等java原生态的数据类型的序列化效率较低（网络只能传输二进制数据
        //所以要序列化），所以hadoop对数据类型进行了封装，有代替品：LongWritable、Text
        //map方法处理完数据之后，需要返回一个值(一个key,一个value)
        //KEYOUT:是map方法处理完数据之后返回的key的数据类型
        //VALUEOUT:是map方法处理完数据之后返回的value的数据类型
        //我们自定义map方法的调用规律：
        //maptask每读取一行数据，就调用一次map方法
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            //先将Text转台成String类型
            String[] words = line.split("\\s+");
            for (String word : words) {
                context.write(new Text(word), new LongWritable(1));
                //将单词作为key，将1作为value,以便于后续数据分发
                //map端的输出数据：
                //<hello,1>
                //<world,1>
            }
        }
    }

    static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
        //reduce方法要接收的输入参数有两个：一个key,一个value(它是迭代器,Iterable<T>value)
        //用迭代器是因为在数据量很大的情况下，迭代器不占内存
        //所有key相同的数据都会给一个reduce
        //KEYIN:框架要传递给reduce方法的输入参数的KEY的输入类型--对应map方法输出的key的类型
        //VALUEIN:框架要传输给reduce方法的输入参数的VALUE的类型--对应map方法的输出value类型
        //KEYOUT:reduce方法处理完成后的输出结果的key的类型
        //VALUEOUT:reduce方法处理完成后的输出结果的value的类型(这里要输出 hello 3 这种格式)
        //reduce方法的调用规律:框架从map端的输出结果中，挑选出所有的key相同的一组key-value数据对
        //组成一组，然后每一组调用一次reduce方法
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            //接收到的数据:
            //注:是按key的字典顺序排序（升序）
            //<aaa,1>
            //<bbb,1>
            //<hello,1>
            //<hello,1>
            //<hrllo1,1>
            //...
            //传入参数，是一组相同单词key的kv键值对
            //value是若干相同key的所有value的集合
            //<hello,1,1,1,1>
            long count = 0;//定义变量累加单词出现的次数
            for (LongWritable v : values) {
                count = count + v.get();//v.get()返回值为Long
                //次数不一定是1，这样写比较保险
            }
            context.write(key, new LongWritable(count));
            //将数据输出
        }

    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        //获取一个配置文件
        conf.set("fs.defaultFS", "file:///");
        //在本地运行
        System.setProperty("HADOOP_USER_NAME", "root");
        conf.set("mapreduce.framework.name", "local");
        Job job = getInstance(conf, "wordcount");
        //wordcount为job的名字

        job.setJarByClass(MyWordCount.class);
        //指定job的路径
        job.setMapperClass(MyMapper.class);
        //指定本业务job要使用的Mapper类
        job.setReducerClass(MyReducer.class);
        //指定本业务job要使用的Reducer类

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        //map端write方法输出的类型

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //reduce端write方法输出的类型

        FileInputFormat.setInputPaths(job, new Path("C:\\Users\\Administrator\\Desktop\\Code\\learn_hadoop\\input.txt"));
        //指定job的输入路径
        FileOutputFormat.setOutputPath(job, new Path("E:\\output\\output6"));
        //指定job的输出路径

        job.setNumReduceTasks(3);
        //设置reducetask的个数，默认是1
        boolean b = job.waitForCompletion(true);
        //提交job,true表示显示程序的详细信息
        System.exit(b ? 0 : 1);
        //0表示正常退出
    }

}
